/*
 * Licensed to the Apache Software Foundation (ASF) under one or more
 * contributor license agreements.  See the NOTICE file distributed with
 * this work for additional information regarding copyright ownership.
 * The ASF licenses this file to You under the Apache License, Version 2.0
 * (the "License"); you may not use this file except in compliance with
 * the License.  You may obtain a copy of the License at
 *
 *      http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

package org.apache.ignite.internal.processors.query.h2.twostep;

import java.lang.reflect.Constructor;
import java.sql.Connection;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.sql.Statement;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import javax.cache.CacheException;
import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.IgniteClientDisconnectedException;
import org.apache.ignite.IgniteException;
import org.apache.ignite.IgniteLogger;
import org.apache.ignite.cluster.ClusterNode;
import org.apache.ignite.events.DiscoveryEvent;
import org.apache.ignite.events.Event;
import org.apache.ignite.events.EventType;
import org.apache.ignite.internal.GridKernalContext;
import org.apache.ignite.internal.GridTopic;
import org.apache.ignite.internal.IgniteInterruptedCheckedException;
import org.apache.ignite.internal.managers.communication.GridIoPolicy;
import org.apache.ignite.internal.managers.communication.GridMessageListener;
import org.apache.ignite.internal.managers.eventstorage.GridLocalEventListener;
import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
import org.apache.ignite.internal.processors.cache.GridCacheContext;
import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionFullMap;
import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionMap2;
import org.apache.ignite.internal.processors.cache.query.GridCacheSqlQuery;
import org.apache.ignite.internal.processors.cache.query.GridCacheTwoStepQuery;
import org.apache.ignite.internal.processors.query.GridQueryCacheObjectsIterator;
import org.apache.ignite.internal.processors.query.h2.GridH2ResultSetIterator;
import org.apache.ignite.internal.processors.query.h2.IgniteH2Indexing;
import org.apache.ignite.internal.processors.query.h2.sql.GridSqlQuerySplitter;
import org.apache.ignite.internal.processors.query.h2.sql.GridSqlType;
import org.apache.ignite.internal.processors.query.h2.twostep.messages.GridQueryCancelRequest;
import org.apache.ignite.internal.processors.query.h2.twostep.messages.GridQueryFailResponse;
import org.apache.ignite.internal.processors.query.h2.twostep.messages.GridQueryNextPageRequest;
import org.apache.ignite.internal.processors.query.h2.twostep.messages.GridQueryNextPageResponse;
import org.apache.ignite.internal.processors.query.h2.twostep.messages.GridQueryRequest;
import org.apache.ignite.internal.util.GridSpinBusyLock;
import org.apache.ignite.internal.util.typedef.F;
import org.apache.ignite.internal.util.typedef.internal.U;
import org.apache.ignite.lang.IgniteFuture;
import org.apache.ignite.marshaller.Marshaller;
import org.apache.ignite.plugin.extensions.communication.Message;
import org.h2.command.ddl.CreateTableData;
import org.h2.engine.Session;
import org.h2.index.Cursor;
import org.h2.jdbc.JdbcConnection;
import org.h2.jdbc.JdbcResultSet;
import org.h2.jdbc.JdbcStatement;
import org.h2.result.ResultInterface;
import org.h2.result.Row;
import org.h2.table.Column;
import org.h2.util.IntArray;
import org.h2.value.Value;
import org.jetbrains.annotations.Nullable;
import org.jsr166.ConcurrentHashMap8;

import static org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion.NONE;

/**
 * Reduce query executor.
 */
public class GridReduceQueryExecutor {
    /** Thread pool to process query messages. */
    public static final byte QUERY_POOL = GridIoPolicy.SYSTEM_POOL;

    /** */
    private GridKernalContext ctx;

    /** */
    private IgniteH2Indexing h2;

    /** */
    private IgniteLogger log;

    /** */
    private final AtomicLong reqIdGen = new AtomicLong();

    /** */
    private final ConcurrentMap<Long, QueryRun> runs = new ConcurrentHashMap8<>();

    /** */
    private volatile List<GridThreadLocalTable> fakeTbls = Collections.emptyList();

    /** */
    private final Lock fakeTblsLock = new ReentrantLock();

    /** */
    private static final Constructor<JdbcResultSet> CONSTRUCTOR;

    /**
     * Init constructor.
     */
    static {
        try {
            CONSTRUCTOR = JdbcResultSet.class.getDeclaredConstructor(
                JdbcConnection.class,
                JdbcStatement.class,
                ResultInterface.class,
                Integer.TYPE,
                Boolean.TYPE,
                Boolean.TYPE,
                Boolean.TYPE
            );

            CONSTRUCTOR.setAccessible(true);
        }
        catch (NoSuchMethodException e) {
            throw new IllegalStateException("Check H2 version in classpath.", e);
        }
    }

    /** */
    private final GridSpinBusyLock busyLock;

    /**
     * @param busyLock Busy lock.
     */
    public GridReduceQueryExecutor(GridSpinBusyLock busyLock) {
        this.busyLock = busyLock;
    }

    /**
     * @param ctx Context.
     * @param h2 H2 Indexing.
     * @throws IgniteCheckedException If failed.
     */
    public void start(final GridKernalContext ctx, final IgniteH2Indexing h2) throws IgniteCheckedException {
        this.ctx = ctx;
        this.h2 = h2;

        log = ctx.log(GridReduceQueryExecutor.class);

        ctx.io().addMessageListener(GridTopic.TOPIC_QUERY, new GridMessageListener() {
            @Override public void onMessage(UUID nodeId, Object msg) {
                if (!busyLock.enterBusy())
                    return;

                try {
                    GridReduceQueryExecutor.this.onMessage(nodeId, msg);
                }
                finally {
                    busyLock.leaveBusy();
                }
            }
        });

        ctx.event().addLocalEventListener(new GridLocalEventListener() {
            @Override public void onEvent(final Event evt) {
                UUID nodeId = ((DiscoveryEvent)evt).eventNode().id();

                for (QueryRun r : runs.values()) {
                    for (GridMergeIndex idx : r.idxs) {
                        if (idx.hasSource(nodeId)) {
                            handleNodeLeft(r, nodeId);

                            break;
                        }
                    }
                }
            }
        }, EventType.EVT_NODE_FAILED, EventType.EVT_NODE_LEFT);
    }

    /**
     * @param r Query run.
     * @param nodeId Left node ID.
     */
    private void handleNodeLeft(QueryRun r, UUID nodeId) {
        // Will attempt to retry. If reduce query was started it will fail on next page fetching.
        retry(r, h2.readyTopologyVersion(), nodeId);
    }

    /**
     * @param nodeId Node ID.
     * @param msg Message.
     */
    public void onMessage(UUID nodeId, Object msg) {
        try {
            assert msg != null;

            ClusterNode node = ctx.discovery().node(nodeId);

            if (node == null)
                return; // Node left, ignore.

            boolean processed = true;

            if (msg instanceof GridQueryNextPageResponse)
                onNextPage(node, (GridQueryNextPageResponse)msg);
            else if (msg instanceof GridQueryFailResponse)
                onFail(node, (GridQueryFailResponse)msg);
            else
                processed = false;

            if (processed && log.isDebugEnabled())
                log.debug("Processed response: " + nodeId + "->" + ctx.localNodeId() + " " + msg);
        }
        catch(Throwable th) {
            U.error(log, "Failed to process message: " + msg, th);
        }
    }

    /**
     * @param node Node.
     * @param msg Message.
     */
    private void onFail(ClusterNode node, GridQueryFailResponse msg) {
        QueryRun r = runs.get(msg.queryRequestId());

        fail(r, node.id(), msg.error());
    }

    /**
     * @param r Query run.
     * @param nodeId Failed node ID.
     * @param msg Error message.
     */
    private void fail(QueryRun r, UUID nodeId, String msg) {
        if (r != null)
            r.state(new CacheException("Failed to execute map query on the node: " + nodeId + ", " + msg), nodeId);
    }

    /**
     * @param node Node.
     * @param msg Message.
     */
    private void onNextPage(final ClusterNode node, GridQueryNextPageResponse msg) {
        final long qryReqId = msg.queryRequestId();
        final int qry = msg.query();

        final QueryRun r = runs.get(qryReqId);

        if (r == null) // Already finished with error or canceled.
            return;

        final int pageSize = r.pageSize;

        GridMergeIndex idx = r.idxs.get(msg.query());

        GridResultPage page;

        try {
            page = new GridResultPage(ctx, node.id(), msg) {
                @Override public void fetchNextPage() {
                    Object errState = r.state.get();

                    if (errState != null) {
                        CacheException err0 = errState instanceof CacheException ? (CacheException)errState : null;

                        if (err0 != null && err0.getCause() instanceof IgniteClientDisconnectedException)
                            throw err0;

                        CacheException e = new CacheException("Failed to fetch data from node: " + node.id());

                        if (err0 != null)
                            e.addSuppressed(err0);

                        throw e;
                    }

                    try {
                        GridQueryNextPageRequest msg0 = new GridQueryNextPageRequest(qryReqId, qry, pageSize);

                        if (node.isLocal())
                            h2.mapQueryExecutor().onMessage(ctx.localNodeId(), msg0);
                        else
                            ctx.io().send(node, GridTopic.TOPIC_QUERY, msg0, QUERY_POOL);
                    }
                    catch (IgniteCheckedException e) {
                        throw new CacheException("Failed to fetch data from node: " + node.id(), e);
                    }
                }
            };
        }
        catch (Exception e) {
            U.error(log, "Error in message.", e);

            fail(r, node.id(), "Error in message.");

            return;
        }

        idx.addPage(page);

        if (msg.retry() != null)
            retry(r, msg.retry(), node.id());
        else if (msg.allRows() != -1) // Only the first page contains row count.
            r.latch.countDown();
    }

    /**
     * @param r Query run.
     * @param retryVer Retry version.
     * @param nodeId Node ID.
     */
    private void retry(QueryRun r, AffinityTopologyVersion retryVer, UUID nodeId) {
        r.state(retryVer, nodeId);
    }

    /**
     * @param cctx Cache context for main space.
     * @param extraSpaces Extra spaces.
     * @return {@code true} If preloading is active.
     */
    private boolean isPreloadingActive(final GridCacheContext<?,?> cctx, List<String> extraSpaces) {
        if (hasMovingPartitions(cctx))
            return true;

        if (extraSpaces != null) {
            for (String extraSpace : extraSpaces) {
                if (hasMovingPartitions(cacheContext(extraSpace)))
                    return true;
            }
        }

        return false;
    }

    /**
     * @param cctx Cache context.
     * @return {@code true} If cache context
     */
    private boolean hasMovingPartitions(GridCacheContext<?,?> cctx) {
        GridDhtPartitionFullMap fullMap = cctx.topology().partitionMap(false);

        for (GridDhtPartitionMap2 map : fullMap.values()) {
            if (map.hasMovingPartitions())
                return true;
        }

        return false;
    }

    /**
     * @param name Cache name.
     * @return Cache context.
     */
    private GridCacheContext<?,?> cacheContext(String name) {
        return ctx.cache().internalCache(name).context();
    }

    /**
     * @param topVer Topology version.
     * @param cctx Cache context for main space.
     * @param extraSpaces Extra spaces.
     * @return Data nodes or {@code null} if repartitioning started and we need to retry..
     */
    private Collection<ClusterNode> stableDataNodes(
        AffinityTopologyVersion topVer,
        final GridCacheContext<?,?> cctx,
        List<String> extraSpaces
    ) {
        String space = cctx.name();

        Set<ClusterNode> nodes = new HashSet<>(dataNodes(space, topVer));

        if (F.isEmpty(nodes))
            throw new CacheException("Failed to find data nodes for cache: " + space);

        if (!F.isEmpty(extraSpaces)) {
            for (String extraSpace : extraSpaces) {
                GridCacheContext<?,?> extraCctx = cacheContext(extraSpace);

                if (extraCctx.isLocal())
                    continue; // No consistency guaranties for local caches.

                if (cctx.isReplicated() && !extraCctx.isReplicated())
                    throw new CacheException("Queries running on replicated cache should not contain JOINs " +
                        "with partitioned tables [rCache=" + cctx.name() + ", pCache=" + extraSpace + "]");

                Collection<ClusterNode> extraNodes = dataNodes(extraSpace, topVer);

                if (F.isEmpty(extraNodes))
                    throw new CacheException("Failed to find data nodes for cache: " + extraSpace);

                if (cctx.isReplicated() && extraCctx.isReplicated()) {
                    nodes.retainAll(extraNodes);

                    if (nodes.isEmpty()) {
                        if (isPreloadingActive(cctx, extraSpaces))
                            return null; // Retry.
                        else
                            throw new CacheException("Caches have distinct sets of data nodes [cache1=" + cctx.name() +
                                ", cache2=" + extraSpace + "]");
                    }
                }
                else if (!cctx.isReplicated() && extraCctx.isReplicated()) {
                    if (!extraNodes.containsAll(nodes))
                        if (isPreloadingActive(cctx, extraSpaces))
                            return null; // Retry.
                        else
                            throw new CacheException("Caches have distinct sets of data nodes [cache1=" + cctx.name() +
                                ", cache2=" + extraSpace + "]");
                }
                else if (!cctx.isReplicated() && !extraCctx.isReplicated()) {
                    if (extraNodes.size() != nodes.size() || !nodes.containsAll(extraNodes))
                        if (isPreloadingActive(cctx, extraSpaces))
                            return null; // Retry.
                        else
                            throw new CacheException("Caches have distinct sets of data nodes [cache1=" + cctx.name() +
                                ", cache2=" + extraSpace + "]");
                }
                else
                    throw new IllegalStateException();
            }
        }

        return nodes;
    }

    /**
     * @param cctx Cache context.
     * @param qry Query.
     * @param keepBinary Keep binary.
     * @return Cursor.
     */
    public Iterator<List<?>> query(GridCacheContext<?,?> cctx, GridCacheTwoStepQuery qry, boolean keepBinary) {
        for (int attempt = 0;; attempt++) {
            if (attempt != 0) {
                try {
                    Thread.sleep(attempt * 10); // Wait for exchange.
                }
                catch (InterruptedException e) {
                    Thread.currentThread().interrupt();

                    throw new CacheException("Query was interrupted.", e);
                }
            }

            long qryReqId = reqIdGen.incrementAndGet();

            QueryRun r = new QueryRun();

            r.pageSize = qry.pageSize() <= 0 ? GridCacheTwoStepQuery.DFLT_PAGE_SIZE : qry.pageSize();

            r.idxs = new ArrayList<>(qry.mapQueries().size());

            String space = cctx.name();

            r.conn = (JdbcConnection)h2.connectionForSpace(space);

            AffinityTopologyVersion topVer = h2.readyTopologyVersion();

            List<String> extraSpaces = extraSpaces(space, qry.spaces());

            Collection<ClusterNode> nodes;

            // Explicit partition mapping for unstable topology.
            Map<ClusterNode, IntArray> partsMap = null;

            if (isPreloadingActive(cctx, extraSpaces)) {
                if (cctx.isReplicated())
                    nodes = replicatedUnstableDataNodes(cctx, extraSpaces);
                else {
                    partsMap = partitionedUnstableDataNodes(cctx, extraSpaces);

                    nodes = partsMap == null ? null : partsMap.keySet();
                }
            }
            else
                nodes = stableDataNodes(topVer, cctx, extraSpaces);

            if (nodes == null)
                continue; // Retry.

            assert !nodes.isEmpty();

            if (cctx.isReplicated() || qry.explain()) {
                assert qry.explain() || !nodes.contains(ctx.discovery().localNode()) :
                    "We must be on a client node.";

                // Select random data node to run query on a replicated data or get EXPLAIN PLAN from a single node.
                nodes = Collections.singleton(F.rand(nodes));
            }

            int tblIdx = 0;

            final boolean skipMergeTbl = !qry.explain() && qry.skipMergeTable();

            for (GridCacheSqlQuery mapQry : qry.mapQueries()) {
                GridMergeIndex idx;

                if (!skipMergeTbl) {
                    GridMergeTable tbl;

                    try {
                        tbl = createMergeTable(r.conn, mapQry, qry.explain());
                    }
                    catch (IgniteCheckedException e) {
                        throw new IgniteException(e);
                    }

                    idx = tbl.getScanIndex(null);

                    fakeTable(r.conn, tblIdx++).setInnerTable(tbl);
                }
                else
                    idx = GridMergeIndexUnsorted.createDummy();

                for (ClusterNode node : nodes)
                    idx.addSource(node.id());

                r.idxs.add(idx);
            }

            r.latch = new CountDownLatch(r.idxs.size() * nodes.size());

            runs.put(qryReqId, r);

            try {
                if (ctx.clientDisconnected()) {
                    throw new CacheException("Query was cancelled, client node disconnected.",
                        new IgniteClientDisconnectedException(ctx.cluster().clientReconnectFuture(),
                        "Client node disconnected."));
                }

                Collection<GridCacheSqlQuery> mapQrys = qry.mapQueries();

                if (qry.explain()) {
                    mapQrys = new ArrayList<>(qry.mapQueries().size());

                    for (GridCacheSqlQuery mapQry : qry.mapQueries())
                        mapQrys.add(new GridCacheSqlQuery("EXPLAIN " + mapQry.query(), mapQry.parameters()));
                }

                if (nodes.size() != 1 || !F.first(nodes).isLocal()) { // Marshall params for remotes.
                    Marshaller m = ctx.config().getMarshaller();

                    for (GridCacheSqlQuery mapQry : mapQrys)
                        mapQry.marshallParams(m);
                }

                boolean retry = false;

                if (send(nodes,
                    new GridQueryRequest(qryReqId, r.pageSize, space, mapQrys, topVer, extraSpaces, null), partsMap)) {
                    awaitAllReplies(r, nodes);

                    Object state = r.state.get();

                    if (state != null) {
                        if (state instanceof CacheException) {
                            CacheException err = (CacheException)state;

                            if (err.getCause() instanceof IgniteClientDisconnectedException)
                                throw err;

                            throw new CacheException("Failed to run map query remotely.", err);
                        }

                        if (state instanceof AffinityTopologyVersion) {
                            retry = true;

                            // If remote node asks us to retry then we have outdated full partition map.
                            h2.awaitForReadyTopologyVersion((AffinityTopologyVersion)state);
                        }
                    }
                }
                else // Send failed.
                    retry = true;

                Iterator<List<?>> resIter = null;

                if (!retry) {
                    if (qry.explain())
                        return explainPlan(r.conn, space, qry);

                    if (skipMergeTbl) {
                        List<List<?>> res = new ArrayList<>();

                        assert r.idxs.size() == 1 : r.idxs;

                        GridMergeIndex idx = r.idxs.get(0);

                        Cursor cur = idx.findInStream(null, null);

                        while (cur.next()) {
                            Row row = cur.get();

                            int cols = row.getColumnCount();

                            List<Object> resRow  = new ArrayList<>(cols);

                            for (int c = 0; c < cols; c++)
                                resRow.add(row.getValue(c).getObject());

                            res.add(resRow);
                        }

                        resIter = res.iterator();
                    }
                    else {
                        GridCacheSqlQuery rdc = qry.reduceQuery();

                        // Statement caching is prohibited here because we can't guarantee correct merge index reuse.
                        ResultSet res = h2.executeSqlQueryWithTimer(space,
                            r.conn,
                            rdc.query(),
                            F.asList(rdc.parameters()),
                            false);

                        resIter = new Iter(res);
                    }
                }

                for (GridMergeIndex idx : r.idxs) {
                    if (!idx.fetchedAll()) // We have to explicitly cancel queries on remote nodes.
                        send(nodes, new GridQueryCancelRequest(qryReqId), null);
                }

                if (retry) {
                    if (Thread.currentThread().isInterrupted())
                        throw new IgniteInterruptedCheckedException("Query was interrupted.");

                    continue;
                }

                return new GridQueryCacheObjectsIterator(resIter, cctx, keepBinary);
            }
            catch (IgniteCheckedException | RuntimeException e) {
                U.closeQuiet(r.conn);

                if (e instanceof CacheException)
                    throw (CacheException)e;

                Throwable cause = e;

                if (e instanceof IgniteCheckedException) {
                    Throwable disconnectedErr =
                        ((IgniteCheckedException)e).getCause(IgniteClientDisconnectedException.class);

                    if (disconnectedErr != null)
                        cause = disconnectedErr;
                }

                throw new CacheException("Failed to run reduce query locally.", cause);
            }
            finally {
                if (!runs.remove(qryReqId, r))
                    U.warn(log, "Query run was already removed: " + qryReqId);

                if (!skipMergeTbl) {
                    for (int i = 0, mapQrys = qry.mapQueries().size(); i < mapQrys; i++)
                        fakeTable(null, i).setInnerTable(null); // Drop all merge tables.
                }
            }
        }
    }

    /**
     * @param r Query run.
     * @param nodes Nodes to check periodically if they alive.
     * @throws IgniteInterruptedCheckedException If interrupted.
     */
    private void awaitAllReplies(QueryRun r, Collection<ClusterNode> nodes)
        throws IgniteInterruptedCheckedException {
        while (!U.await(r.latch, 500, TimeUnit.MILLISECONDS)) {
            for (ClusterNode node : nodes) {
                if (!ctx.discovery().alive(node)) {
                    handleNodeLeft(r, node.id());

                    assert r.latch.getCount() == 0;

                    return;
                }
            }
        }
    }

    /**
     * @param idx Table index.
     * @return Table name.
     */
    private static String table(int idx) {
        return GridSqlQuerySplitter.table(idx).getSQL();
    }

    /**
     * Gets or creates new fake table for index.
     *
     * @param idx Index of table.
     * @return Table.
     */
    private GridThreadLocalTable fakeTable(Connection c, int idx) {
        List<GridThreadLocalTable> tbls = fakeTbls;

        assert tbls.size() >= idx;

        if (tbls.size() == idx) { // If table for such index does not exist, create one.
            fakeTblsLock.lock();

            try {
                if ((tbls = fakeTbls).size() == idx) { // Double check inside of lock.
                    try (Statement stmt = c.createStatement()) {
                        stmt.executeUpdate("CREATE TABLE " + table(idx) +
                            "(fake BOOL) ENGINE \"" + GridThreadLocalTable.Engine.class.getName() + '"');
                    }
                    catch (SQLException e) {
                        throw new IllegalStateException(e);
                    }

                    List<GridThreadLocalTable> newTbls = new ArrayList<>(tbls.size() + 1);

                    newTbls.addAll(tbls);
                    newTbls.add(GridThreadLocalTable.Engine.getCreated());

                    fakeTbls = tbls = newTbls;
                }
            }
            finally {
                fakeTblsLock.unlock();
            }
        }

        return tbls.get(idx);
    }

    /**
     * Calculates data nodes for replicated caches on unstable topology.
     *
     * @param cctx Cache context for main space.
     * @param extraSpaces Extra spaces.
     * @return Collection of all data nodes owning all the caches or {@code null} for retry.
     */
    private Collection<ClusterNode> replicatedUnstableDataNodes(final GridCacheContext<?,?> cctx,
        List<String> extraSpaces) {
        assert cctx.isReplicated() : cctx.name() + " must be replicated";

        Set<ClusterNode> nodes = replicatedUnstableDataNodes(cctx);

        if (F.isEmpty(nodes))
            return null; // Retry.

        if (!F.isEmpty(extraSpaces)) {
            for (String extraSpace : extraSpaces) {
                GridCacheContext<?,?> extraCctx = cacheContext(extraSpace);

                if (extraCctx.isLocal())
                    continue;

                if (!extraCctx.isReplicated())
                    throw new CacheException("Queries running on replicated cache should not contain JOINs " +
                        "with tables in partitioned caches [rCache=" + cctx.name() + ", pCache=" + extraSpace + "]");

                Set<ClusterNode> extraOwners = replicatedUnstableDataNodes(extraCctx);

                if (F.isEmpty(extraOwners))
                    return null; // Retry.

                nodes.retainAll(extraOwners);

                if (nodes.isEmpty())
                    return null; // Retry.
            }
        }

        return nodes;
    }

    /**
     * @param space Cache name.
     * @param topVer Topology version.
     * @return Collection of data nodes.
     */
    private Collection<ClusterNode> dataNodes(String space, AffinityTopologyVersion topVer) {
        Collection<ClusterNode> res = ctx.discovery().cacheAffinityNodes(space, topVer);

        return res != null ? res : Collections.<ClusterNode>emptySet();
    }

    /**
     * Collects all the nodes owning all the partitions for the given replicated cache.
     *
     * @param cctx Cache context.
     * @return Owning nodes or {@code null} if we can't find owners for some partitions.
     */
    private Set<ClusterNode> replicatedUnstableDataNodes(GridCacheContext<?,?> cctx) {
        assert cctx.isReplicated() : cctx.name() + " must be replicated";

        String space = cctx.name();

        Set<ClusterNode> dataNodes = new HashSet<>(dataNodes(space, NONE));

        if (dataNodes.isEmpty())
            throw new CacheException("Failed to find data nodes for cache: " + space);

        // Find all the nodes owning all the partitions for replicated cache.
        for (int p = 0, parts = cctx.affinity().partitions(); p < parts; p++) {
            List<ClusterNode> owners = cctx.topology().owners(p);

            if (F.isEmpty(owners))
                return null; // Retry.

            dataNodes.retainAll(owners);

            if (dataNodes.isEmpty())
                return null; // Retry.
        }

        return dataNodes;
    }

    /**
     * Calculates partition mapping for partitioned cache on unstable topology.
     *
     * @param cctx Cache context for main space.
     * @param extraSpaces Extra spaces.
     * @return Partition mapping or {@code null} if we can't calculate it due to repartitioning and we need to retry.
     */
    @SuppressWarnings("unchecked")
    private Map<ClusterNode, IntArray> partitionedUnstableDataNodes(final GridCacheContext<?,?> cctx,
        List<String> extraSpaces) {
        assert !cctx.isReplicated() && !cctx.isLocal() : cctx.name() + " must be partitioned";

        final int partsCnt = cctx.affinity().partitions();

        if (extraSpaces != null) { // Check correct number of partitions for partitioned caches.
            for (String extraSpace : extraSpaces) {
                GridCacheContext<?,?> extraCctx = cacheContext(extraSpace);

                if (extraCctx.isReplicated() || extraCctx.isLocal())
                    continue;

                int parts = extraCctx.affinity().partitions();

                if (parts != partsCnt)
                    throw new CacheException("Number of partitions must be the same for correct collocation [cache1=" +
                        cctx.name() + ", parts1=" + partsCnt + ", cache2=" + extraSpace + ", parts2=" + parts + "]");
            }
        }

        Set<ClusterNode>[] partLocs = new Set[partsCnt];

        // Fill partition locations for main cache.
        for (int p = 0, parts =  cctx.affinity().partitions(); p < parts; p++) {
            List<ClusterNode> owners = cctx.topology().owners(p);

            if (F.isEmpty(owners)) {
                if (!F.isEmpty(dataNodes(cctx.name(), NONE)))
                    return null; // Retry.

                throw new CacheException("Failed to find data nodes [cache=" + cctx.name() + ", part=" + p + "]");
            }

            partLocs[p] = new HashSet<>(owners);
        }

        if (extraSpaces != null) {
            // Find owner intersections for each participating partitioned cache partition.
            // We need this for logical collocation between different partitioned caches with the same affinity.
            for (String extraSpace : extraSpaces) {
                GridCacheContext<?,?> extraCctx = cacheContext(extraSpace);

                if (extraCctx.isReplicated() || extraCctx.isLocal())
                    continue;

                for (int p = 0, parts =  extraCctx.affinity().partitions(); p < parts; p++) {
                    List<ClusterNode> owners = extraCctx.topology().owners(p);

                    if (F.isEmpty(owners)) {
                        if (!F.isEmpty(dataNodes(extraSpace, NONE)))
                            return null; // Retry.

                        throw new CacheException("Failed to find data nodes [cache=" + extraSpace + ", part=" + p + "]");
                    }

                    if (partLocs[p] == null)
                        partLocs[p] = new HashSet<>(owners);
                    else {
                        partLocs[p].retainAll(owners); // Intersection of owners.

                        if (partLocs[p].isEmpty())
                            return null; // Intersection is empty -> retry.
                    }
                }
            }

            // Filter nodes where not all the replicated caches loaded.
            for (String extraSpace : extraSpaces) {
                GridCacheContext<?,?> extraCctx = cacheContext(extraSpace);

                if (!extraCctx.isReplicated())
                    continue;

                Set<ClusterNode> dataNodes = replicatedUnstableDataNodes(extraCctx);

                if (F.isEmpty(dataNodes))
                    return null; // Retry.

                for (Set<ClusterNode> partLoc : partLocs) {
                    partLoc.retainAll(dataNodes);

                    if (partLoc.isEmpty())
                        return null; // Retry.
                }
            }
        }

        // Collect the final partitions mapping.
        Map<ClusterNode, IntArray> res = new HashMap<>();

        // Here partitions in all IntArray's will be sorted in ascending order, this is important.
        for (int p = 0; p < partLocs.length; p++) {
            Set<ClusterNode> pl = partLocs[p];

            assert !F.isEmpty(pl) : pl;

            ClusterNode n = pl.size() == 1 ? F.first(pl) : F.rand(pl);

            IntArray parts = res.get(n);

            if (parts == null)
                res.put(n, parts = new IntArray());

            parts.add(p);
        }

        return res;
    }

    /**
     * @param mainSpace Main space.
     * @param allSpaces All spaces.
     * @return List of all extra spaces or {@code null} if none.
     */
    private List<String> extraSpaces(String mainSpace, Set<String> allSpaces) {
        if (F.isEmpty(allSpaces) || (allSpaces.size() == 1 && allSpaces.contains(mainSpace)))
            return null;

        ArrayList<String> res = new ArrayList<>(allSpaces.size());

        for (String space : allSpaces) {
            if (!F.eq(space, mainSpace))
                res.add(space);
        }

        return res;
    }

    /**
     * @param c Connection.
     * @param space Space.
     * @param qry Query.
     * @return Cursor for plans.
     * @throws IgniteCheckedException if failed.
     */
    private Iterator<List<?>> explainPlan(JdbcConnection c, String space, GridCacheTwoStepQuery qry)
        throws IgniteCheckedException {
        List<List<?>> lists = new ArrayList<>();

        for (int i = 0, mapQrys = qry.mapQueries().size(); i < mapQrys; i++) {
            ResultSet rs = h2.executeSqlQueryWithTimer(space, c, "SELECT PLAN FROM " + table(i), null, false);

            lists.add(F.asList(getPlan(rs)));
        }

        int tblIdx = 0;

        for (GridCacheSqlQuery mapQry : qry.mapQueries()) {
            GridMergeTable tbl = createMergeTable(c, mapQry, false);

            fakeTable(c, tblIdx++).setInnerTable(tbl);
        }

        GridCacheSqlQuery rdc = qry.reduceQuery();

        ResultSet rs = h2.executeSqlQueryWithTimer(space,
            c,
            "EXPLAIN " + rdc.query(),
            F.asList(rdc.parameters()),
            false);

        lists.add(F.asList(getPlan(rs)));

        return lists.iterator();
    }

    /**
     * @param rs Result set.
     * @return Plan.
     * @throws IgniteCheckedException If failed.
     */
    private String getPlan(ResultSet rs) throws IgniteCheckedException {
        try {
            if (!rs.next())
                throw new IllegalStateException();

            return rs.getString(1);
        }
        catch (SQLException e) {
            throw new IgniteCheckedException(e);
        }
    }

    /**
     * @param nodes Nodes.
     * @param msg Message.
     * @param partsMap Partitions.
     * @return {@code true} If all messages sent successfully.
     */
    private boolean send(
        Collection<ClusterNode> nodes,
        Message msg,
        Map<ClusterNode,IntArray> partsMap
    ) {
        boolean locNodeFound = false;

        boolean ok = true;

        for (ClusterNode node : nodes) {
            if (node.isLocal()) {
                locNodeFound = true;

                continue;
            }

            try {
                ctx.io().send(node, GridTopic.TOPIC_QUERY, copy(msg, node, partsMap), QUERY_POOL);
            }
            catch (IgniteCheckedException e) {
                ok = false;

                U.warn(log, e.getMessage());
            }
        }

        if (locNodeFound) // Local node goes the last to allow parallel execution.
            h2.mapQueryExecutor().onMessage(ctx.localNodeId(), copy(msg, ctx.discovery().localNode(), partsMap));

        return ok;
    }

    /**
     * @param msg Message to copy.
     * @param node Node.
     * @param partsMap Partitions map.
     * @return Copy of message with partitions set.
     */
    private Message copy(Message msg, ClusterNode node, Map<ClusterNode,IntArray> partsMap) {
        if (partsMap == null)
            return msg;

        GridQueryRequest res = new GridQueryRequest((GridQueryRequest)msg);

        IntArray parts = partsMap.get(node);

        assert parts != null : node;

        int[] partsArr = new int[parts.size()];

        parts.toArray(partsArr);

        res.partitions(partsArr);

        return res;
    }

    /**
     * @param conn Connection.
     * @param qry Query.
     * @param explain Explain.
     * @return Table.
     * @throws IgniteCheckedException
     */
    private GridMergeTable createMergeTable(JdbcConnection conn, GridCacheSqlQuery qry, boolean explain)
        throws IgniteCheckedException {
        try {
            Session ses = (Session)conn.getSession();

            CreateTableData data  = new CreateTableData();

            data.tableName = "T___";
            data.schema = ses.getDatabase().getSchema(ses.getCurrentSchemaName());
            data.create = true;

            if (!explain) {
                LinkedHashMap<String,?> colsMap = qry.columns();

                assert colsMap != null;

                ArrayList<Column> cols = new ArrayList<>(colsMap.size());

                for (Map.Entry<String,?> e : colsMap.entrySet()) {
                    String alias = e.getKey();
                    GridSqlType t = (GridSqlType)e.getValue();

                    assert !F.isEmpty(alias);

                    Column c = new Column(alias, t.type(), t.precision(), t.scale(), t.displaySize());

                    cols.add(c);
                }

                data.columns = cols;
            }
            else
                data.columns = planColumns();

            return new GridMergeTable(data, ctx);
        }
        catch (Exception e) {
            U.closeQuiet(conn);

            throw new IgniteCheckedException(e);
        }
    }

    /**
     * @return Columns.
     */
    private static ArrayList<Column> planColumns() {
        ArrayList<Column> res = new ArrayList<>(1);

        res.add(new Column("PLAN", Value.STRING));

        return res;
    }

    /**
     * @param reconnectFut Reconnect future.
     */
    public void onDisconnected(IgniteFuture<?> reconnectFut) {
        CacheException err = new CacheException("Query was cancelled, client node disconnected.",
            new IgniteClientDisconnectedException(reconnectFut, "Client node disconnected."));

        for (Map.Entry<Long, QueryRun> e : runs.entrySet())
            e.getValue().disconnected(err);
    }

    /**
     *
     */
    private static class QueryRun {
        /** */
        private List<GridMergeIndex> idxs;

        /** */
        private CountDownLatch latch;

        /** */
        private JdbcConnection conn;

        /** */
        private int pageSize;

        /** Can be either CacheException in case of error or AffinityTopologyVersion to retry if needed. */
        private final AtomicReference<Object> state = new AtomicReference<>();

        /**
         * @param o Fail state object.
         * @param nodeId Node ID.
         */
        void state(Object o, @Nullable UUID nodeId) {
            assert o != null;
            assert o instanceof CacheException || o instanceof AffinityTopologyVersion : o.getClass();

            if (!state.compareAndSet(null, o))
                return;

            while (latch.getCount() != 0) // We don't need to wait for all nodes to reply.
                latch.countDown();

            for (GridMergeIndex idx : idxs) // Fail all merge indexes.
                idx.fail(nodeId);
        }

        /**
         * @param e Error.
         */
        void disconnected(CacheException e) {
            if (!state.compareAndSet(null, e))
                return;

            while (latch.getCount() != 0) // We don't need to wait for all nodes to reply.
                latch.countDown();

            for (GridMergeIndex idx : idxs) // Fail all merge indexes.
                idx.fail(e);
        }
    }

    /**
     *
     */
    private static class Iter extends GridH2ResultSetIterator<List<?>> {
        /** */
        private static final long serialVersionUID = 0L;

        /**
         * @param data Data array.
         * @throws IgniteCheckedException If failed.
         */
        protected Iter(ResultSet data) throws IgniteCheckedException {
            super(data, true);
        }

        /** {@inheritDoc} */
        @Override protected List<?> createRow() {
            ArrayList<Object> res = new ArrayList<>(row.length);

            Collections.addAll(res, row);

            return res;
        }
    }
}
